package com.smile.cloud.order.service;

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.smile.cloud.order.mapper.TransactionLogMapper;
import com.smile.cloud.order.model.TransactionLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Component
public class OrderTransactionListener implements TransactionListener {

    @Resource
    private OrderService orderService;
    @Resource
    private TransactionLogMapper transactionLogMapper;

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("开始执行本地事务....");
        /**
         * 本地事务执行会有三种可能
         * 1、commit 成功
         * 2、rollback 失败
         * 3、unknow 网络等原因 blocker还未知事务状态
         *
         * COMMIT_MESSAGE 经过blocker二次确认，本地事务已提交，消费者可以去消费
         * ROLLBACK_MESSAGE 本地事务异常回滚，不提交本地事务，删除blocker消息，消费者不会去消费
         * UNKNOW 状态未知，本地事务回查，重新确认事务状态
         */
        LocalTransactionState state = LocalTransactionState.COMMIT_MESSAGE;
        try {
            String body = new String(message.getBody());
            OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
            int result = orderService.createOrder(order, message.getTransactionId());
            if (result == 1) {
                state = LocalTransactionState.COMMIT_MESSAGE;
                log.info("本地事务已提交。{}", message.getTransactionId());
            }
            if (result == 0) {
                state = LocalTransactionState.ROLLBACK_MESSAGE;
                log.info("本地事务未提交。{}", message.getTransactionId());
            }

        } catch (Exception e) {
            log.info("执行本地事务失败。{0}", e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    /**
     * 本地事务进行回查，只有 LocalTransactionState.UNKNOW 才会进行回查，重新确认事务状态
     *
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("开始回查本地事务状态。{}", messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();

        LambdaQueryWrapper<TransactionLog> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(TransactionLog::getId, transactionId);
        if (transactionLogMapper.selectCount(queryWrapper) > 0) {
            state = LocalTransactionState.COMMIT_MESSAGE;
        } else {
            state = LocalTransactionState.UNKNOW;
        }
        log.info("结束本地事务状态查询：{}", state);
        return state;
    }
}
